# /-----------------------------\     /-------------------------------\
# | Main R process              |     | Subprocess 1                  |
# | +------------------------+  |     | +---------------------------+ |
# | | test_dir_parallel()    |  |     | | test_file()               | |
# | | +-------------------+  |  |     | | +-----------------------+ | |
# | | | Event loop        |< ------+  | | | SubprocessReporter    | | |
# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
# | |    |                   |  |  |  | | | | test_that()       | | | |
# | |    v                   |  |  |  | | | +-------------------+ | | |
# | | +-------------------+  |  |  |  | | |    |                  | | |
# | | | Progress2Reporter |  |  |  |  | | |    v                  | | |
# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
# | +------------------------+  |  |--------| signalCondition() | | | |
# \-----------------------------/  |  | | | +-------------------+ | | |
#                                  |  | | +-----------------------+ | |
#                                  |  | +---------------------------+ |
#                                  |  \-------------------------------/
#                                  |  /-------------------------------\
#                                  |--| Subprocess 2                  |
#                                  |  \-------------------------------/
#                                  |  /-------------------------------\
#                                  \--| Subprocess 3                  |
#                                     \-------------------------------/
#
# ## Notes
#
# * Subprocesses run `callr::r_session` R sessions. They are re-used,
#   one R session can be used for several test_file() calls.
# * Helper and setup files are loaded in the subprocesses after this.
# * The main process puts all test files in the task queue, and then
#   runs an event loop.

test_files_parallel <- function(
  test_dir,
  test_package,
  test_paths,
  load_helpers = TRUE,
  reporter = default_parallel_reporter(),
  env = NULL,
  stop_on_failure = FALSE,
  stop_on_warning = FALSE,
  wrap = TRUE, # unused, to match test_files signature
  load_package = c("none", "installed", "source"),
  shuffle = FALSE
) {
  # TODO: support timeouts. 20-30s for each file by default?

  num_workers <- min(default_num_cpus(), length(test_paths))
  cli::cli_inform("Starting {num_workers} test process{?es}.")

  # Set up work queue ------------------------------------------
  queue <- NULL
  withr::defer(queue_teardown(queue))

  # Start workers in parallel and add test tasks to queue.
  queue <- queue_setup(
    test_paths = test_paths,
    test_package = test_package,
    test_dir = test_dir,
    load_helpers = load_helpers,
    num_workers = num_workers,
    load_package = load_package,
    shuffle = shuffle
  )

  withr::with_dir(test_dir, {
    reporters <- test_files_reporter(reporter, "parallel")
    with_reporter(reporters$multi, {
      parallel_updates <- reporter$capabilities$parallel_updates
      if (parallel_updates) {
        parallel_event_loop_smooth(queue, reporters, ".")
      } else {
        parallel_event_loop_chunky(queue, reporters, ".")
      }
    })

    test_files_check(
      reporters$list$get_results(),
      stop_on_failure = stop_on_failure,
      stop_on_warning = stop_on_warning
    )
  })
}


default_num_cpus <- function() {
  # Use common option, if set
  ncpus <- getOption("Ncpus", NULL)
  if (!is.null(ncpus)) {
    ncpus <- suppressWarnings(as.integer(ncpus))
    if (is.na(ncpus)) {
      cli::cli_abort(
        "{.code getOption('Ncpus')} must be an integer.",
        call = NULL
      )
    }
    return(ncpus)
  }

  # Otherwise use env var if set
  ncpus <- Sys.getenv("TESTTHAT_CPUS", "")
  if (ncpus != "") {
    ncpus <- suppressWarnings(as.integer(ncpus))
    if (is.na(ncpus)) {
      cli::cli_abort("{.envvar TESTTHAT_CPUS} must be an integer.")
    }
    return(ncpus)
  }

  # Otherwise 2
  2L
}

parallel_event_loop_smooth <- function(queue, reporters, test_dir) {
  update_interval <- 0.1
  next_update <- proc.time()[[3]] + update_interval

  while (!queue$is_idle()) {
    # How much time do we have to poll before the next UI update?
    now <- proc.time()[[3]]
    poll_time <- max(next_update - now, 0)
    next_update <- now + update_interval

    msgs <- queue$poll(poll_time)

    updated <- FALSE
    for (x in msgs) {
      if (x$code == PROCESS_OUTPUT) {
        lns <- paste0("> ", x$path, ": ", x$message)
        cat("\n", file = stdout())
        base::writeLines(lns, stdout())
        next
      }
      if (x$code != PROCESS_MSG) {
        next
      }

      m <- x$message
      if (!inherits(m, "testthat_message")) {
        cli::cli_inform(as.character(m))
        next
      }

      if (m$cmd != "DONE") {
        reporters$multi$start_file(m$filename)
        reporters$multi$start_test(m$context, m$test)
        if (m$type == "snapshotter") {
          snapshotter <- getOption("testthat.snapshotter")
          do.call(snapshotter[[m$cmd]], m$args)
        } else {
          do.call(reporters$multi[[m$cmd]], m$args)
          updated <- TRUE
        }
      }
    }

    # We need to spin, even if there were no events
    if (!updated) {
      reporters$multi$update()
    }
  }
}

parallel_event_loop_chunky <- function(queue, reporters, test_dir) {
  files <- list()
  while (!queue$is_idle()) {
    msgs <- queue$poll(Inf)
    for (x in msgs) {
      if (x$code == PROCESS_OUTPUT) {
        lns <- paste0("> ", x$path, ": ", x$message)
        base::writeLines(lns, stdout())
        next
      }
      if (x$code != PROCESS_MSG) {
        next
      }

      m <- x$message
      if (!inherits(m, "testthat_message")) {
        cli::cli_inform(as.character(m))
        next
      }

      # Record all events until we get end of file, then we replay them all
      # with the local reporters. This prevents out of order reporting.
      if (m$cmd != "DONE") {
        files[[m$filename]] <- append(files[[m$filename]], list(m))
      } else {
        replay_events(reporters$multi, files[[m$filename]])
        reporters$multi$end_context_if_started()
        files[[m$filename]] <- NULL
      }
    }
  }
}

replay_events <- function(reporter, events) {
  snapshotter <- getOption("testthat.snapshotter")
  for (m in events) {
    if (m$type == "snapshotter") {
      do.call(snapshotter[[m$cmd]], m$args)
    } else {
      do.call(reporter[[m$cmd]], m$args)
    }
  }
}

queue_setup <- function(
  test_paths,
  test_package,
  test_dir,
  num_workers,
  load_helpers,
  load_package,
  shuffle = FALSE
) {
  # TODO: observe `load_package`, but the "none" default is not
  # OK for the subprocess, because it'll not have the tested package
  if (load_package == "none") {
    load_package <- "source"
  }

  # TODO: similarly, load_helpers = FALSE, coming from devtools,
  # is not appropriate in the subprocess
  load_helpers <- TRUE

  test_package <- test_package %||% Sys.getenv("TESTTHAT_PKG")

  hyperlinks <- cli::ansi_has_hyperlink_support()

  # First we load the package "manually", in case it is testthat itself
  load_hook <- expr({
    switch(
      !!load_package,
      installed = library(!!test_package, character.only = TRUE),
      source = pkgload::load_all(!!test_dir, helpers = FALSE, quiet = TRUE)
    )

    # Ensure snapshot can generate hyperlinks for snapshot_accept()
    options(
      cli.hyperlink = !!hyperlinks,
      cli.hyperlink_run = !!hyperlinks
    )

    asNamespace("testthat")$queue_process_setup(
      test_package = !!test_package,
      test_dir = !!test_dir,
      load_helpers = !!load_helpers,
      load_package = "none"
    )
  })
  queue <- task_q$new(concurrency = num_workers, load_hook = load_hook)

  fun <- transport_fun(function(path, shuffle) {
    asNamespace("testthat")$queue_task(path, shuffle)
  })
  for (path in test_paths) {
    queue$push(fun, list(path, shuffle))
  }

  queue
}

queue_process_setup <- function(
  test_package,
  test_dir,
  load_helpers,
  load_package
) {
  env <- asNamespace("testthat")$test_files_setup_env(
    test_package,
    test_dir,
    load_package
  )

  asNamespace("testthat")$test_files_setup_state(
    test_dir = test_dir,
    test_package = test_package,
    load_helpers = load_helpers,
    env = env,
    frame = .GlobalEnv
  )

  # record testing env for mocks & queue_task
  # manual implementation of local_testing_env()
  the$testing_env <- env
}

queue_task <- function(path, shuffle = FALSE) {
  withr::local_envvar("TESTTHAT_IS_PARALLEL" = "true")
  snapshotter <- SubprocessSnapshotReporter$new(snap_dir = "_snaps")
  withr::local_options(testthat.snapshotter = snapshotter)
  reporters <- list(
    SubprocessReporter$new(),
    snapshotter
  )
  multi <- MultiReporter$new(reporters = reporters)
  with_reporter(
    multi,
    test_one_file(path, env = the$testing_env, shuffle = shuffle)
  )
  NULL
}

# Clean up subprocesses: we call teardown methods, but we only give them a
# second, before killing the whole process tree using ps's env var marker
# method.
queue_teardown <- function(queue) {
  if (is.null(queue)) {
    return()
  }

  tasks <- queue$list_tasks()
  num <- nrow(tasks)

  # calling quit() here creates a race condition, and the output of
  # the deferred_run() might be lost. Instead we close the input
  # connection in a separate task.
  clean_fn <- function() {
    withr::deferred_run(.GlobalEnv)
  }

  topoll <- integer()
  for (i in seq_len(num)) {
    if (
      !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
    ) {
      # The worker might have crashed or exited, so this might fail.
      # If it does then we'll just ignore that worker
      tryCatch(
        {
          tasks$worker[[i]]$call(clean_fn)
          topoll <- c(topoll, i)
        },
        error = function(e) NULL
      )
    }
  }

  # Give covr a bit more time
  if (in_covr()) {
    grace <- 30L
  } else {
    grace <- 1L
  }
  first_error <- NULL
  limit <- Sys.time() + grace
  while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
    timeout <- as.double(timeout, units = "secs") * 1000
    conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
    pr <- unlist(processx::poll(conns, as.integer(timeout)))
    for (i in which(pr == "ready")) {
      msg <- tasks$worker[[topoll[i]]]$read()
      first_error <- first_error %||% msg$error
    }
    topoll <- topoll[pr != "ready"]
  }

  topoll <- integer()
  for (i in seq_len(num)) {
    if (
      !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
    ) {
      tryCatch(
        {
          close(tasks$worker[[i]]$get_input_connection())
          topoll <- c(topoll, i)
        },
        error = function(e) NULL
      )
    }
  }

  limit <- Sys.time() + grace
  while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
    timeout <- as.double(timeout, units = "secs") * 1000
    conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
    pr <- unlist(processx::poll(conns, as.integer(timeout)))
    topoll <- topoll[pr != "ready"]
  }

  for (i in seq_len(num)) {
    if (!is.null(tasks$worker[[i]])) {
      if (ps::ps_is_supported()) {
        tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL)
      } else {
        tryCatch(tasks$worker[[i]]$kill(), error = function(e) NULL)
      }
    }
  }

  if (!is.null(first_error)) {
    cli::cli_abort(
      "At least one parallel worker failed to run teardown",
      parent = first_error
    )
  }
}

# Reporter that just forwards events in the subprocess back to the main process
#
# Ideally, these messages would be throttled, i.e. if the test code
# emits many expectation conditions fast, SubprocessReporter should
# collect several of them and only emit a condition a couple of times
# a second. End-of-test and end-of-file events would be transmitted
# immediately.
SubprocessReporter <- R6::R6Class(
  "SubprocessReporter",
  inherit = Reporter,
  public = list(
    start_file = function(filename) {
      private$filename <- filename
      private$event("start_file", filename)
    },
    start_test = function(context, test) {
      private$context <- context
      private$test <- test
      private$event("start_test", context, test)
    },
    start_context = function(context) {
      private$context <- context
      private$event("start_context", context)
    },
    add_result = function(context, test, result) {
      if (inherits(result, "expectation_success")) {
        # Strip bulky components to reduce data transfer cost
        result[["srcref"]] <- NULL
        result[["trace"]] <- NULL
      }
      private$event("add_result", context, test, result)
    },
    end_test = function(context, test) {
      private$event("end_test", context, test)
    },
    end_context = function(context) {
      private$event("end_context", context)
    },
    end_file = function() {
      private$event("end_file")
    },
    end_reporter = function() {
      private$event("DONE")
    }
  ),

  private = list(
    filename = NULL,
    context = NULL,
    test = NULL,
    event = function(cmd, ...) {
      msg <- list(
        code = PROCESS_MSG,
        type = "reporter",
        cmd = cmd,
        filename = private$filename,
        context = private$context,
        test = private$test,
        time = proc.time()[[3]],
        args = list(...)
      )
      class(msg) <- c("testthat_message", "callr_message", "condition")
      signalCondition(msg)
    }
  )
)
